package com.ruoyi.web.controller.tool;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.opencsv.CSVReader;
import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.kepServer.aircompressor.domain.Aircompressor;
import com.ruoyi.kepServer.aircompressor.mapper.AircompressorMapper;
import com.ruoyi.kepServer.aircompressor.service.IAircompressorService;
import com.ruoyi.kepServer.aircompressor.service.impl.AircompressorServiceImpl;
import com.ruoyi.kepServer.centralpumphouse.domain.Centralpumphouse;
import com.ruoyi.kepServer.centralpumphouse.mapper.CentralpumphouseMapper;
import com.ruoyi.kepServer.centralpumphouse.service.impl.CentralpumphouseServiceImpl;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscriptionManager;
import org.eclipse.milo.opcua.sdk.client.nodes.UaNode;
import org.eclipse.milo.opcua.sdk.client.subscriptions.ManagedDataItem;
import org.eclipse.milo.opcua.sdk.client.subscriptions.ManagedSubscription;
import org.eclipse.milo.opcua.stack.core.AttributeId;
import org.eclipse.milo.opcua.stack.core.Identifiers;
import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Demo {
    private final static String endPointUrl = "opc.tcp://127.0.0.1:49320";
    //private final static String endPointUrl = "opc.tcp://192.168.5.153:45321";
    private final  static String endPointUrlBelt = "opc.tcp://192.168.5.60:4862";
    // 假设我们有一个要订阅的节点列表
    private  List<String> keyToSubscribe = new ArrayList<>();
    private  Centralpumphouse centralpumphouse=new Centralpumphouse();
    /**
     * 创建OPC UA客户端
     */
    private static OpcUaClient createClient() throws Exception {
        //opc ua服务端地址
        Path securityTempDir = Paths.get(System.getProperty("java.io.tmpdir"), "security");
        Files.createDirectories(securityTempDir);
        if (!Files.exists(securityTempDir)) {
            throw new Exception("unable to create security dir: " + securityTempDir);
        }
        return OpcUaClient.create(endPointUrl, endpoints -> endpoints.stream().filter(e -> e.getSecurityPolicyUri().equals(SecurityPolicy.None.getUri())).findFirst(), configBuilder -> configBuilder.setApplicationName(LocalizedText.english("eclipse milo opc-ua client")).setApplicationUri("urn:eclipse:milo:examples:client")
                //访问方式
                //.setIdentityProvider(new AnonymousProvider())
                .setIdentityProvider(new UsernameProvider("ua", "cdtu0123456789"))
                .setRequestTimeout(UInteger.valueOf(500)).build());
    }
    /**
     * 创建OPC UA客户端
     */
    private static OpcUaClient createClientBelt() throws Exception {
        Path securityTempDir = Paths.get(System.getProperty("java.io.tmpdir"), "security");
        Files.createDirectories(securityTempDir);
        if (!Files.exists(securityTempDir)) {
            throw new Exception("unable to create security dir: " + securityTempDir);
        }
        return OpcUaClient.create(endPointUrlBelt,
                endpoints ->
                        endpoints.stream()
                                .filter(e -> e.getSecurityPolicyUri().equals(SecurityPolicy.None.getUri()))
                                .findFirst(),
                configBuilder ->
                        configBuilder
                                .setApplicationName(LocalizedText.english("eclipse milo opc-ua client"))
                                .setApplicationUri("urn:eclipse:milo:examples:client")
                                //访问方式
                                .setIdentityProvider(new AnonymousProvider())
                                .setRequestTimeout(UInteger.valueOf(5000))
                                .build()
        );
    }
    /**
     * 遍历树形节点
     *
     * @param client OPC UA客户端
     * @param uaNode 节点
     * @throws Exception
     */
    private  void browseNode(OpcUaClient client, UaNode uaNode) throws Exception {
        List<? extends UaNode> nodes;
        if (uaNode == null) {
            nodes = client.getAddressSpace().browseNodes(Identifiers.ObjectsFolder);
        } else {
            nodes = client.getAddressSpace().browseNodes(uaNode);
        }
        for (UaNode nd : nodes) {
            //排除系统行性节点，这些系统性节点名称一般都是以"_"开头
            if (Objects.requireNonNull(nd.getBrowseName().getName()).contains("_")) {
                continue;
            }
            System.out.println("Node= " + nd.getBrowseName().getName());
            browseNode(client, nd);
        }
    }

    /**
     * 读取节点数据
     *
     * @param client OPC UA客户端
     * @throws Exception
     */
    private  void readNode(OpcUaClient client) throws Exception {
        int namespaceIndex = 2;
        String identifier = "通道 1.设备 1.标记 1";
        //节点
        NodeId nodeId = new NodeId(namespaceIndex, identifier);
        //读取节点数据
        DataValue value = client.readValue(0.0, TimestampsToReturn.Neither, nodeId).get();
        //标识符
        identifier = String.valueOf(nodeId.getIdentifier());
        System.out.println(identifier + ": " + String.valueOf(value.getValue().getValue()));
    }

    /**
     * 写入节点数据
     *
     * @param client
     * @throws Exception
     */
    private  void writeNodeValue(OpcUaClient client) throws Exception {
        //节点
        NodeId nodeId = new NodeId(2, "通道 1.设备 1.标记 4");
        Short i = 48;
        //创建数据对象,此处的数据对象一定要定义类型，不然会出现类型错误，导致无法写入
        DataValue nowValue = new DataValue(new Variant(i), null, null);
        //写入节点数据
        StatusCode statusCode = client.writeValue(nodeId, nowValue).join();
        System.out.println("结果：" + statusCode.isGood());
    }

    /**
     * 订阅(单个)
     *
     * @param client
     * @throws Exception
     */
    private static void subscribe(OpcUaClient client) throws Exception {
        AtomicInteger a = new AtomicInteger();
        //创建发布间隔1000ms的订阅对象
        client.getSubscriptionManager().createSubscription(1000.0).thenAccept(t -> {
            //节点
            NodeId nodeId = new NodeId(2, "电力系统.井下变电所高压一.G10高开.CMU02_pcu04_Analog0");
//            NodeId nodeId = new NodeId(1, "t|NO1_40001B00");
            System.out.println(nodeId);
            ReadValueId readValueId = new ReadValueId(nodeId, AttributeId.Value.uid(), null, null);
            //创建监控的参数
            MonitoringParameters parameters = new MonitoringParameters(UInteger.valueOf(a.getAndIncrement()), 1000.0, null, UInteger.valueOf(10), true);
            //创建监控项请求
            //该请求最后用于创建订阅。
            MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting, parameters);
            List<MonitoredItemCreateRequest> requests = new ArrayList<>();
            requests.add(request);
            //创建监控项，并且注册变量值改变时候的回调函数。
            t.createMonitoredItems(TimestampsToReturn.Both, requests, (item, id) -> item.setValueConsumer((it, val) -> {
                System.out.println("nodeid :" + it.getReadValueId().getNodeId());
                System.out.println("value :" + val.getValue().getValue());
            }));
        }).get();

        //持续订阅
        Thread.sleep(Long.MAX_VALUE);
    }

    /**
     * 批量订阅
     *
     * @param client
     * @throws Exception
     */
//  private  void managedSubscriptionEvent(OpcUaClient client) throws Exception {
//      final CountDownLatch eventLatch = new CountDownLatch(1);
//
//      //处理订阅业务
//      handlerNode(client);
//
//      //持续监听
//      eventLatch.await();
//  }

    /**
     * 处理订阅业务
     *
     * @param client OPC UA客户端
     */
    private static void handlerNode(OpcUaClient client) {
        try {
            //创建订阅
            ManagedSubscription subscription = ManagedSubscription.create(client);
            //第几个进程,第几个订阅
            //你所需要订阅的key  格式（n=***;s=***）
            List<String> key = new ArrayList<>();
            //获得tag name
            key.add("煤矿1.皮带.NO1_40001B00");
            List<NodeId> nodeIdList = new ArrayList<>();
            for (String s : key) {
                nodeIdList.add(new NodeId(1, s));
            }
            System.out.println(nodeIdList.size());
            System.out.println("############");
            //监听
            List<ManagedDataItem> dataItemList = subscription.createDataItems(nodeIdList);
            System.out.println(dataItemList);
            for (ManagedDataItem managedDataItem : dataItemList) {
                managedDataItem.addDataValueListener((t) -> {
                    System.out.println(managedDataItem.getNodeId().getIdentifier().toString() + ":" + t.getValue().getValue().toString());
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 获得key名称
     *
     * @param  path csv位置
     */
    private static List<String> getCsvName(String path) {
        String csvFile = path;
        String [] line ;
        List<String> column1List = new ArrayList<>();
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(csvFile), "GBK"));
             CSVReader csvReader = new CSVReader(reader)) {
            // 读取文件头行
            String[] header = csvReader.readNext();
            // 找到第一列
            int column1Index =-1;
            for (int i = 0; i < header.length; i++) {
                if (header[i].equals("Tag Name")) {
                    column1Index = i;
                }
            }
            // 读取每一行数据
            while ((line = csvReader.readNext()) != null) {
                // 提取第三列和第四列的数据
                if (column1Index != -1) {
                    column1List.add(line[column1Index]);
                }

            }
        } catch (Exception e) {

            e.printStackTrace();

        }
        return column1List;
    }
    /**
     * 自定义订阅监听
     */
    private static class CustomSubscriptionListener implements UaSubscriptionManager.SubscriptionListener {

        private OpcUaClient client;

        CustomSubscriptionListener(OpcUaClient client) {
            this.client = client;
        }

        public void onKeepAlive(UaSubscription subscription, DateTime publishTime) {
            System.out.println("onKeepAlive");
        }

        public void onStatusChanged(UaSubscription subscription, StatusCode status) {
            System.out.println("onStatusChanged");
        }

        public void onPublishFailure(UaException exception) {
            System.out.println("onPublishFailure");
        }

        public void onNotificationDataLost(UaSubscription subscription) {
            System.out.println("onNotificationDataLost");
        }

        /**
         * 重连时 尝试恢复之前的订阅失败时 会调用此方法
         * @param uaSubscription 订阅
         * @param statusCode 状态
         */
        public void onSubscriptionTransferFailed(UaSubscription uaSubscription, StatusCode statusCode) {
            System.out.println("恢复订阅失败 需要重新订阅");
            //在回调方法中重新订阅
            handlerNode(client);
        }
    }

    /**
     * 批量订阅
     *
     * @param client
     * @throws Exception
     */
    private static void managedSubscriptionEvent(OpcUaClient client) throws Exception {
        final CountDownLatch eventLatch = new CountDownLatch(1);

        //添加订阅监听器，用于处理断线重连后的订阅问题
        client.getSubscriptionManager().addSubscriptionListener(new CustomSubscriptionListener(client));

        //处理订阅业务
        handlerNode(client);

        //持续监听
        eventLatch.await();
    }
    public static OpcUaClient createClientNewEndpoint(String endPointUrl, String username, String password) {
        System.out.println(endPointUrl);
        IdentityProvider identityProvider = new AnonymousProvider();
        if (!StringUtils.isEmpty(username) || !StringUtils.isEmpty(password)) {
            identityProvider = new UsernameProvider(username, password);
        }
        try {
            Function<List<EndpointDescription>, Optional<EndpointDescription>> selectEndpoint = endpoints -> {
                final Optional<EndpointDescription> endpoint = endpoints
                        .stream()
                        //SecurityPolicy.Basic256
                        .filter(e -> e.getSecurityPolicyUri().equals(SecurityPolicy.None.getUri()))
                        .findFirst();
                EndpointDescription newEndpoint = new EndpointDescription(endPointUrl, endpoint.get().getServer(), endpoint.get().getServerCertificate(),
                        endpoint.get().getSecurityMode(), endpoint.get().getSecurityPolicyUri(), endpoint.get().getUserIdentityTokens(),
                        endpoint.get().getTransportProfileUri(), endpoint.get().getSecurityLevel());
                return Optional.of(newEndpoint);
            };
            IdentityProvider finalIdentityProvider = identityProvider;
            OpcUaClient opcClient = OpcUaClient.create(endPointUrl,
                    selectEndpoint,
                    configBuilder -> configBuilder
                            .setApplicationName(LocalizedText.english("plc"))
                            .setApplicationUri("urn:eclipse:milo:examples:client")
                            //访问方式
                            .setIdentityProvider(finalIdentityProvider)
                            .setRequestTimeout(UInteger.valueOf(10000))
                            .build()
            );
            opcClient.connect().get();
            System.out.println("连接成功:success");
            return opcClient;
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("======== opc connection fail ========");
        }
        return null;
    }
    public static Object readValue(String identifier, OpcUaClient client) {
        System.out.println("点位数据：" + identifier);
        NodeId nodeId = new NodeId(2, identifier);
        DataValue value = null;
        try {
            value = client.readValue(0.0, TimestampsToReturn.Both, nodeId).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        if (Objects.nonNull(value) && Objects.nonNull(value.getValue()) && Objects.nonNull(value.getValue().getValue())) {
            return value.getValue().getValue();
        }
        return null;
    }
    public static void main(String[] args) throws Exception {
//        OpcUaClient client = createClient();
//        client.connect().get();
        //managedSubscriptionEvent(client);
        //subscribe(client);
        OpcUaClient client =createClientNewEndpoint(endPointUrl, "ua", "cdtu0123456789");
        subscribe(client);
//        Object value = readValue("煤矿.中央水泵房.BXL", client);
//        System.out.println("当前点位值: " + value);
//        Thread.sleep(Integer.MAX_VALUE);
    }

}
